62ddca638ae76918f3a0a5fac4ca8fbdb1c7fb9a,runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java,CreateStreamTest,testLateDataAccumulating,#,83

Before Change


    Pipeline p = pipelineRule.createPipeline();
    Instant instant = new Instant(0);
    CreateStream<TimestampedValue<Integer>> source =
        CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration())
            .nextBatch()
            .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
            .nextBatch(
                TimestampedValue.of(1, instant),
                TimestampedValue.of(2, instant),
                TimestampedValue.of(3, instant))
            .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
            // These elements are late but within the allowed lateness
            .nextBatch(
                TimestampedValue.of(4, instant),
                TimestampedValue.of(5, instant))
            // These elements are droppably late
            .advanceNextBatchWatermarkToInfinity()
            .nextBatch(
                TimestampedValue.of(-1, instant),
                TimestampedValue.of(-2, instant),
                TimestampedValue.of(-3, instant));

    PCollection<Integer> windowed = p
        .apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
        .apply(ParDo.of(new OnlyValue<Integer>()))
        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
            AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(2)))

After Change


    Pipeline p = pipelineRule.createPipeline();
    Instant instant = new Instant(0);
    CreateStream<Integer> source =
        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
            .emptyBatch()
            .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
            .nextBatch(
                TimestampedValue.of(1, instant),
                TimestampedValue.of(2, instant),
                TimestampedValue.of(3, instant))
            .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
            // These elements are late but within the allowed lateness
            .nextBatch(
                TimestampedValue.of(4, instant),
                TimestampedValue.of(5, instant))
            // These elements are droppably late
            .advanceNextBatchWatermarkToInfinity()
            .nextBatch(
                TimestampedValue.of(-1, instant),
                TimestampedValue.of(-2, instant),
                TimestampedValue.of(-3, instant));

    PCollection<Integer> windowed = p
        .apply(source)
        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
            AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(2)))